package cn.tnar.yunpark.io;

import cn.tnar.yunpark.util.Util;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;

/**
 * 创建一个Flowable，监听指定的UDP端口，收到数据时发射UDP包及socket
 * Created by tieh on 2017/6/20.
 */
public class FlowableUdpServer {
    private static final Logger log = LoggerFactory.getLogger(FlowableUdpServer.class);

    public static Flowable<UdpSocketData> create(String localIp, int port) {
        return Flowable.create(new UdpProcessor(localIp, port), BackpressureStrategy.BUFFER);
    }

    private static class UdpProcessor implements FlowableOnSubscribe<UdpSocketData> {

        private String localIp;
        private int port;

        private UdpProcessor(String localIp, int port) {
            this.localIp = localIp;
            this.port = port;
        }

        @Override
        public void subscribe(FlowableEmitter<UdpSocketData> e) throws Exception {
            try {
                DatagramSocket socket;
                if(localIp == null) {
                    socket = new DatagramSocket(port);
                } else {
                    socket = new DatagramSocket(new InetSocketAddress(localIp, port));
                }
                e.setCancellable(socket::close);
                do {
                    DatagramPacket packet = new DatagramPacket(new byte[1024], 1024);
                    socket.receive(packet);
                    log.debug(packet.getSocketAddress().toString() + " <= " + Util.toHex(packet.getData(), packet.getLength()));
                    e.onNext(new UdpSocketData(socket, packet));
                } while (!e.isCancelled());
            } catch (Exception err) {
                e.onError(err);
            }
        }
    }
}
